package day03.window;

import beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

/**
 * Flink 流处理 API - 其它可选 API
 *
 * @author lvbingbing
 * @date 2022-01-02 13:05
 */
public class FlinkWindow04 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、学习其它可选API
        studyAnotherApi(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 其它可选API
     * <p>
     * 1、trigger()
     * 2、evictor()
     * 3、allowedLateness()
     * 4、sideOutputLateData()
     * 5、getSideOutput()
     * <p>
     *
     * @param sensorReadingStream <br>
     */
    private static void studyAnotherApi(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        OutputTag<SensorReading> outputTag = new OutputTag<>("late");
        SingleOutputStreamOperator<SensorReading> sumStream = sensorReadingStream.keyBy("id")
                .timeWindow(Time.seconds(15))
//                .trigger()
//                .evictor()
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .sum("temperature");

        DataStream<SensorReading> sideOutputStream = sumStream.getSideOutput(outputTag);
        sideOutputStream.print("sideOutputStream()");
    }
}
